package com.itopener.canal.es.core;

import java.util.List;

import javax.annotation.Resource;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.stereotype.Component;

import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;
import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;
import com.alibaba.otter.canal.protocol.Message;
import com.google.protobuf.InvalidProtocolBufferException;
import com.itopener.canal.es.core.config.CanalProperties;
import com.itopener.canal.es.core.handler.EventHandlerSupport;

@Component
@EnableConfigurationProperties(CanalProperties.class)
public class CanalClientRunner implements CommandLineRunner {

	private final Logger logger = LoggerFactory.getLogger(CanalClientRunner.class);

	@Resource
	private EventHandlerSupport eventHandlerSupport;
	
	@Autowired
	private CanalProperties canalProperties;
	
	@Value("${canal.batch.size:1000}")
	private int batchSize;

	@Override
	public void run(String... args) throws Exception {
		CanalConnector canalConnector = canalConnector();
		while(true) {
			Message message = canalConnector.getWithoutAck(batchSize);
			long batchId = message.getId();
//			logger.debug("scheduled_batchId=" + batchId);
			try {
				List<Entry> entries = message.getEntries();
				if (batchId != -1 && entries.size() > 0) {
					entries.forEach(entry -> {
						if (entry.getEntryType() == EntryType.ROWDATA) {
							handleCanalEvent(entry);
						}
					});
					canalConnector.ack(batchId);
				} else {
					canalConnector.ack(batchId);
					Thread.sleep(1000);
				}
			} catch (Exception e) {
				logger.error("发送监听事件失败！batchId回滚,batchId=" + batchId, e);
				canalConnector.rollback(batchId);
			}
		}
	}
	
	public CanalConnector canalConnector() {
		CanalConnector connector = CanalConnectors.newClusterConnector(canalProperties.getZkServers(), canalProperties.getDestination(), canalProperties.getUsername(), canalProperties.getPassword());
		if(connector == null) {
			throw new RuntimeException("can not find canal server, please deploy the cannal server first");
		}
		connector.connect();
		// 指定filter，格式 {database}.{table}，这里不做过滤，过滤操作留给用户
		connector.subscribe();
        // 回滚寻找上次中断的位置
		connector.rollback();
        logger.info("canal client connect success");
        return connector;
	}

	private void handleCanalEvent(Entry entry) {
		try {
			RowChange change = RowChange.parseFrom(entry.getStoreValue());
			eventHandlerSupport.handle(entry, change);
		} catch (InvalidProtocolBufferException e) {
			logger.error("canalEntry_parser_error,根据CanalEntry获取RowChange失败！", e);
			throw new RuntimeException(e);
		}
	}
}
